{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Introduction to XGBoost Spark with GPU\n",
    "\n",
    "The goal of this notebook is to show how to train a XGBoost Model with Spark RAPIDS XGBoost library on GPUs. The dataset used with this notebook is derived from Fannie Mae’s Single-Family Loan Performance Data with all rights reserved by Fannie Mae. This processed dataset is redistributed with permission and consent from Fannie Mae. This notebook uses XGBoost to train 12-month mortgage loan delinquency prediction model .\n",
    "\n",
    "A few libraries required for this notebook:\n",
    "  1. cudf-cu11\n",
    "  2. xgboost\n",
    "  3. scikit-learn\n",
    "  4. numpy\n",
    "\n",
    "This notebook also illustrates the ease of porting a sample CPU based Spark xgboost4j code into GPU. There is no change required for running Spark XGBoost on GPU because both CPU and GPU call the same API. For CPU run, we need to vectorize the trained dataset before fitting data to classifier."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Import All Libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "# if you pass/unpack the archive file and enable the environment\n",
    "# os.environ['PYSPARK_PYTHON'] = \"./environment/bin/python\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from xgboost.spark import SparkXGBClassifier, SparkXGBClassifierModel\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.types import FloatType, IntegerType, StructField, StructType, DoubleType\n",
    "from pyspark.conf import SparkConf\n",
    "from time import time"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Besides CPU version requires two extra libraries.\n",
    "```Python\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "from pyspark.sql.functions import col\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create Spark Session and Data Reader"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "22/11/24 06:14:05 WARN org.apache.spark.resource.ResourceUtils: The configuration of cores (exec = 4 task = 1, runnable tasks = 4) will result in wasted resources due to resource gpu limiting the number of runnable tasks per executor to: 1. Please adjust your configuration.\n",
      "22/11/24 06:14:06 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker\n",
      "22/11/24 06:14:06 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster\n",
      "22/11/24 06:14:06 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat\n",
      "22/11/24 06:14:06 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator\n",
      "22/11/24 06:14:07 WARN com.nvidia.spark.rapids.RapidsPluginUtils: RAPIDS Accelerator 25.02.1 using cudf 25.02.1.\n",
      "22/11/24 06:14:07 WARN com.nvidia.spark.rapids.RapidsPluginUtils: spark.rapids.sql.multiThreadedRead.numThreads is set to 20.\n",
      "22/11/24 06:14:07 WARN com.nvidia.spark.rapids.RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.\n",
      "22/11/24 06:14:07 WARN com.nvidia.spark.rapids.RapidsPluginUtils: spark.rapids.sql.explain is set to `NOT_ON_GPU`. Set it to 'NONE' to suppress the diagnostics logging about the query placement on the GPU.\n"
     ]
    }
   ],
   "source": [
    "SPARK_MASTER_URL = os.getenv(\"SPARK_MASTER_URL\", \"/your-url\")\n",
    "RAPIDS_JAR = os.getenv(\"RAPIDS_JAR\", \"/your-jar-path\")\n",
    "\n",
    "# You need to update with your real hardware resource \n",
    "driverMem = os.getenv(\"DRIVER_MEM\", \"10g\")\n",
    "executorMem = os.getenv(\"EXECUTOR_MEM\", \"10g\")\n",
    "pinnedPoolSize = os.getenv(\"PINNED_POOL_SIZE\", \"2g\")\n",
    "concurrentGpuTasks = os.getenv(\"CONCURRENT_GPU_TASKS\", \"2\")\n",
    "executorCores = int(os.getenv(\"EXECUTOR_CORES\", \"4\"))\n",
    "\n",
    "# Common spark settings\n",
    "conf = SparkConf()\n",
    "conf.setMaster(SPARK_MASTER_URL)\n",
    "conf.setAppName(\"Microbenchmark on GPU\")\n",
    "conf.set(\"spark.driver.memory\", driverMem)\n",
    "## The tasks will run on GPU memory, so there is no need to set a high host memory\n",
    "conf.set(\"spark.executor.memory\", executorMem)\n",
    "## The tasks will run on GPU cores, so there is no need to use many cpu cores\n",
    "conf.set(\"spark.executor.cores\", executorCores)\n",
    "\n",
    "# Plugin settings\n",
    "conf.set(\"spark.executor.resource.gpu.amount\", \"1\")\n",
    "conf.set(\"spark.rapids.sql.concurrentGpuTasks\", concurrentGpuTasks)\n",
    "conf.set(\"spark.rapids.memory.pinnedPool.size\", pinnedPoolSize)\n",
    "##############note: only support value=1 see https://github.com/dmlc/xgboost/blame/master/python-package/xgboost/spark/core.py#L370-L374\n",
    "conf.set(\"spark.task.resource.gpu.amount\", 1) \n",
    "# since pyspark and xgboost share the same GPU, we need to allocate some memory to xgboost to avoid GPU OOM while training \n",
    "conf.set(\"spark.rapids.memory.gpu.allocFraction\",\"0.6\")\n",
    "conf.set(\"spark.rapids.sql.enabled\", \"true\") \n",
    "conf.set(\"spark.plugins\", \"com.nvidia.spark.SQLPlugin\")\n",
    "conf.set(\"spark.sql.cache.serializer\",\"com.nvidia.spark.ParquetCachedBatchSerializer\")\n",
    "conf.set(\"spark.driver.extraClassPath\", RAPIDS_JAR)\n",
    "conf.set(\"spark.sql.execution.arrow.maxRecordsPerBatch\", 200000) \n",
    "conf.set(\"spark.executor.extraClassPath\", RAPIDS_JAR)\n",
    "conf.set(\"spark.jars\", RAPIDS_JAR)\n",
    "\n",
    "# if you pass/unpack the archive file and enable the environment\n",
    "# conf.set(\"spark.yarn.dist.archives\", \"your_pyspark_venv.tar.gz#environment\")\n",
    "\n",
    "# Create spark session\n",
    "spark = SparkSession.builder.config(conf=conf).getOrCreate()\n",
    "reader = spark.read"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Specify the Data Schema and Load the Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "label = 'delinquency_12'\n",
    "schema = StructType([\n",
    "    StructField('orig_channel', FloatType()),\n",
    "    StructField('first_home_buyer', FloatType()),\n",
    "    StructField('loan_purpose', FloatType()),\n",
    "    StructField('property_type', FloatType()),\n",
    "    StructField('occupancy_status', FloatType()),\n",
    "    StructField('property_state', FloatType()),\n",
    "    StructField('product_type', FloatType()),\n",
    "    StructField('relocation_mortgage_indicator', FloatType()),\n",
    "    StructField('seller_name', FloatType()),\n",
    "    StructField('mod_flag', FloatType()),\n",
    "    StructField('orig_interest_rate', FloatType()),\n",
    "    StructField('orig_upb', DoubleType()),\n",
    "    StructField('orig_loan_term', IntegerType()),\n",
    "    StructField('orig_ltv', FloatType()),\n",
    "    StructField('orig_cltv', FloatType()),\n",
    "    StructField('num_borrowers', FloatType()),\n",
    "    StructField('dti', FloatType()),\n",
    "    StructField('borrower_credit_score', FloatType()),\n",
    "    StructField('num_units', IntegerType()),\n",
    "    StructField('zip', IntegerType()),\n",
    "    StructField('mortgage_insurance_percent', FloatType()),\n",
    "    StructField('current_loan_delinquency_status', IntegerType()),\n",
    "    StructField('current_actual_upb', FloatType()),\n",
    "    StructField('interest_rate', FloatType()),\n",
    "    StructField('loan_age', FloatType()),\n",
    "    StructField('msa', FloatType()),\n",
    "    StructField('non_interest_bearing_upb', FloatType()),\n",
    "    StructField(label, IntegerType()),\n",
    "])\n",
    "features = [ x.name for x in schema if x.name != label ]\n",
    "\n",
    "# You need to update them to your real paths!\n",
    "dataRoot = os.getenv(\"DATA_ROOT\", \"/data\")\n",
    "train_path = dataRoot + \"/mortgage/output/train\"\n",
    "eval_path = dataRoot + \"/mortgage/output/eval\"\n",
    "\n",
    "data_format = 'parquet'\n",
    "has_header = 'true'\n",
    "if data_format == 'csv':\n",
    "    train_data = reader.schema(schema).option('header',has_header).csv(train_path)\n",
    "    trans_data = reader.schema(schema).option('header',has_header).csv(eval_path)\n",
    "else :\n",
    "    train_data = reader.load(train_path)\n",
    "    trans_data = reader.load(eval_path)\n",
    "  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note on CPU version, vectorization is required before fitting data to classifier, which means you need to assemble all feature columns into one column.\n",
    "\n",
    "```Python\n",
    "def vectorize(data_frame):\n",
    "    to_floats = [ col(x.name).cast(FloatType()) for x in data_frame.schema ]\n",
    "    return (VectorAssembler()\n",
    "        .setInputCols(features)\n",
    "        .setOutputCol('features')\n",
    "        .transform(data_frame.select(to_floats))\n",
    "        .select(col('features'), col(label)))\n",
    "\n",
    "train_data = vectorize(train_data)\n",
    "trans_data = vectorize(trans_data)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create a XGBoostClassifier"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "params = { \n",
    "    \"tree_method\": \"gpu_hist\",\n",
    "    \"grow_policy\": \"depthwise\",\n",
    "    \"num_workers\": 1,\n",
    "    \"device\": \"cuda\",\n",
    "}\n",
    "params['features_col'] = features\n",
    "params['label_col'] = label\n",
    "    \n",
    "classifier = SparkXGBClassifier(**params)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The parameter `num_workers` should be set to the number of GPUs in Spark cluster for GPU version, while for CPU version it is usually equal to the number of the CPU cores.\n",
    "\n",
    "Concerning the tree method, GPU version only supports `gpu_hist` currently, while `hist` is designed and used here for CPU training.\n",
    "\n",
    "An example of CPU classifier:\n",
    "```\n",
    "classifier = SparkXGBClassifier(\n",
    "  feature_col=features,\n",
    "  label_col=label,  \n",
    "  num_workers=1024,\n",
    "  use_gpu=False,\n",
    ")\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Train the Data with Benchmark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "If features_cols param set, then features_col param is ignored.\n",
      "22/11/24 06:14:44 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n",
      "[Stage 12:>                                                         (0 + 1) / 1]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[06:15:10] WARNING: ../src/learner.cc:553: \n",
      "  If you are loading a serialized model (like pickle in Python, RDS in R) generated by\n",
      "  older XGBoost, please export the model by calling `Booster.save_model` from that version\n",
      "  first, then load it back in current version. See:\n",
      "\n",
      "    https://xgboost.readthedocs.io/en/latest/tutorials/saving_model.html\n",
      "\n",
      "  for more details about differences between saving model and serializing.\n",
      "\n",
      "Training takes 28.6 seconds\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "                                                                                \r",
      "/home/yuali_nvidia_com/.local/lib/python3.8/site-packages/xgboost/sklearn.py:808: UserWarning: Loading a native XGBoost model with Scikit-Learn interface.\n",
      "  warnings.warn(\"Loading a native XGBoost model with Scikit-Learn interface.\")\n"
     ]
    }
   ],
   "source": [
    "def with_benchmark(phrase, action):\n",
    "    start = time()\n",
    "    result = action()\n",
    "    end = time()\n",
    "    print('{} takes {} seconds'.format(phrase, round(end - start, 2)))\n",
    "    return result\n",
    "model = with_benchmark('Training', lambda: classifier.fit(train_data))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Save and Reload the Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "If features_cols param set, then features_col param is ignored.\n",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "model.write().overwrite().save(dataRoot + '/model/mortgage')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "loaded_model = SparkXGBClassifierModel().load(dataRoot + '/model/mortgage')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Transformation and Show Result Sample"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/11/24 06:15:13 WARN com.nvidia.spark.rapids.GpuOverrides: \n",
      "!Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced; unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [rawPrediction#209, probability#275]\n",
      "  @Expression <AttributeReference> orig_channel#56 could run on GPU\n",
      "  @Expression <AttributeReference> first_home_buyer#57 could run on GPU\n",
      "  @Expression <AttributeReference> loan_purpose#58 could run on GPU\n",
      "  @Expression <AttributeReference> property_type#59 could run on GPU\n",
      "  @Expression <AttributeReference> occupancy_status#60 could run on GPU\n",
      "  @Expression <AttributeReference> property_state#61 could run on GPU\n",
      "  @Expression <AttributeReference> product_type#62 could run on GPU\n",
      "  @Expression <AttributeReference> relocation_mortgage_indicator#63 could run on GPU\n",
      "  @Expression <AttributeReference> seller_name#64 could run on GPU\n",
      "  @Expression <AttributeReference> mod_flag#65 could run on GPU\n",
      "  @Expression <AttributeReference> orig_interest_rate#66 could run on GPU\n",
      "  @Expression <AttributeReference> orig_upb#67 could run on GPU\n",
      "  @Expression <AttributeReference> orig_loan_term#68 could run on GPU\n",
      "  @Expression <AttributeReference> orig_ltv#69 could run on GPU\n",
      "  @Expression <AttributeReference> orig_cltv#70 could run on GPU\n",
      "  @Expression <AttributeReference> num_borrowers#71 could run on GPU\n",
      "  @Expression <AttributeReference> dti#72 could run on GPU\n",
      "  @Expression <AttributeReference> borrower_credit_score#73 could run on GPU\n",
      "  @Expression <AttributeReference> num_units#74 could run on GPU\n",
      "  @Expression <AttributeReference> zip#75 could run on GPU\n",
      "  @Expression <AttributeReference> mortgage_insurance_percent#76 could run on GPU\n",
      "  @Expression <AttributeReference> current_loan_delinquency_status#77 could run on GPU\n",
      "  @Expression <AttributeReference> current_actual_upb#78 could run on GPU\n",
      "  @Expression <AttributeReference> interest_rate#79 could run on GPU\n",
      "  @Expression <AttributeReference> loan_age#80 could run on GPU\n",
      "  @Expression <AttributeReference> msa#81 could run on GPU\n",
      "  @Expression <AttributeReference> non_interest_bearing_upb#82 could run on GPU\n",
      "  @Expression <AttributeReference> delinquency_12#83 could run on GPU\n",
      "  !Expression <Alias> UDF(pythonUDF0#342.rawPrediction) AS rawPrediction#209 cannot run on GPU because input expression ScalaUDF UDF(pythonUDF0#342.rawPrediction) (org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 is not supported); expression Alias UDF(pythonUDF0#342.rawPrediction) AS rawPrediction#209 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "    !Expression <ScalaUDF> UDF(pythonUDF0#342.rawPrediction) cannot run on GPU because expression ScalaUDF UDF(pythonUDF0#342.rawPrediction) produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7; neither UDF implemented by class org.apache.spark.ml.functions$$$Lambda$3898/645590696 provides a GPU implementation, nor the conf `spark.rapids.sql.rowBasedUDF.enabled` is enabled\n",
      "      @Expression <GetStructField> pythonUDF0#342.rawPrediction could run on GPU\n",
      "        @Expression <AttributeReference> pythonUDF0#342 could run on GPU\n",
      "  @Expression <Alias> pythonUDF0#342.prediction AS prediction#243 could run on GPU\n",
      "    @Expression <GetStructField> pythonUDF0#342.prediction could run on GPU\n",
      "      @Expression <AttributeReference> pythonUDF0#342 could run on GPU\n",
      "  !Expression <Alias> UDF(pythonUDF0#342.probability) AS probability#275 cannot run on GPU because input expression ScalaUDF UDF(pythonUDF0#342.probability) (org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 is not supported); expression Alias UDF(pythonUDF0#342.probability) AS probability#275 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "    !Expression <ScalaUDF> UDF(pythonUDF0#342.probability) cannot run on GPU because neither UDF implemented by class org.apache.spark.ml.functions$$$Lambda$3898/645590696 provides a GPU implementation, nor the conf `spark.rapids.sql.rowBasedUDF.enabled` is enabled; expression ScalaUDF UDF(pythonUDF0#342.probability) produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "      @Expression <GetStructField> pythonUDF0#342.probability could run on GPU\n",
      "        @Expression <AttributeReference> pythonUDF0#342 could run on GPU\n",
      "\n",
      "22/11/24 06:15:13 WARN com.nvidia.spark.rapids.GpuOverrides: \n",
      "!Exec <InMemoryTableScanExec> cannot run on GPU because unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [rawPrediction, probability]; not all expressions can be replaced; unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [rawPrediction#209, probability#275]\n",
      "  @Expression <AttributeReference> orig_channel#56 could run on GPU\n",
      "  @Expression <AttributeReference> first_home_buyer#57 could run on GPU\n",
      "  @Expression <AttributeReference> loan_purpose#58 could run on GPU\n",
      "  @Expression <AttributeReference> property_type#59 could run on GPU\n",
      "  @Expression <AttributeReference> occupancy_status#60 could run on GPU\n",
      "  @Expression <AttributeReference> property_state#61 could run on GPU\n",
      "  @Expression <AttributeReference> product_type#62 could run on GPU\n",
      "  @Expression <AttributeReference> relocation_mortgage_indicator#63 could run on GPU\n",
      "  @Expression <AttributeReference> seller_name#64 could run on GPU\n",
      "  @Expression <AttributeReference> mod_flag#65 could run on GPU\n",
      "  @Expression <AttributeReference> orig_interest_rate#66 could run on GPU\n",
      "  @Expression <AttributeReference> orig_upb#67 could run on GPU\n",
      "  @Expression <AttributeReference> orig_loan_term#68 could run on GPU\n",
      "  @Expression <AttributeReference> orig_ltv#69 could run on GPU\n",
      "  @Expression <AttributeReference> orig_cltv#70 could run on GPU\n",
      "  @Expression <AttributeReference> num_borrowers#71 could run on GPU\n",
      "  @Expression <AttributeReference> dti#72 could run on GPU\n",
      "  @Expression <AttributeReference> borrower_credit_score#73 could run on GPU\n",
      "  @Expression <AttributeReference> num_units#74 could run on GPU\n",
      "  @Expression <AttributeReference> zip#75 could run on GPU\n",
      "  @Expression <AttributeReference> mortgage_insurance_percent#76 could run on GPU\n",
      "  @Expression <AttributeReference> current_loan_delinquency_status#77 could run on GPU\n",
      "  @Expression <AttributeReference> current_actual_upb#78 could run on GPU\n",
      "  @Expression <AttributeReference> interest_rate#79 could run on GPU\n",
      "  @Expression <AttributeReference> loan_age#80 could run on GPU\n",
      "  @Expression <AttributeReference> msa#81 could run on GPU\n",
      "  @Expression <AttributeReference> non_interest_bearing_upb#82 could run on GPU\n",
      "  @Expression <AttributeReference> delinquency_12#83 could run on GPU\n",
      "  !Expression <AttributeReference> rawPrediction#209 cannot run on GPU because expression AttributeReference rawPrediction#209 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "  @Expression <AttributeReference> prediction#243 could run on GPU\n",
      "  !Expression <AttributeReference> probability#275 cannot run on GPU because expression AttributeReference probability#275 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "\n",
      "22/11/24 06:15:28 WARN com.nvidia.spark.rapids.GpuOverrides:                    \n",
      "!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n",
      "  @Partitioning <SinglePartition$> could run on GPU\n",
      "  !Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced; unsupported data types in input: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [probability#275, rawPrediction#209]\n",
      "    @Expression <Alias> cast(delinquency_12#83 as string) AS delinquency_12#971 could run on GPU\n",
      "      @Expression <Cast> cast(delinquency_12#83 as string) could run on GPU\n",
      "        @Expression <AttributeReference> delinquency_12#83 could run on GPU\n",
      "    @Expression <Alias> cast(rawPrediction#209 as string) AS rawPrediction#972 could run on GPU\n",
      "      !Expression <Cast> cast(rawPrediction#209 as string) cannot run on GPU because Cast from org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 to StringType is not supported\n",
      "        !Expression <AttributeReference> rawPrediction#209 cannot run on GPU because expression AttributeReference rawPrediction#209 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "    @Expression <Alias> cast(probability#275 as string) AS probability#973 could run on GPU\n",
      "      !Expression <Cast> cast(probability#275 as string) cannot run on GPU because Cast from org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 to StringType is not supported\n",
      "        !Expression <AttributeReference> probability#275 cannot run on GPU because expression AttributeReference probability#275 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "    @Expression <Alias> cast(prediction#243 as string) AS prediction#974 could run on GPU\n",
      "      @Expression <Cast> cast(prediction#243 as string) could run on GPU\n",
      "        @Expression <AttributeReference> prediction#243 could run on GPU\n",
      "    !Exec <InMemoryTableScanExec> cannot run on GPU because unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [rawPrediction, probability]; unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [probability#275, rawPrediction#209]; not all expressions can be replaced\n",
      "      @Expression <AttributeReference> delinquency_12#83 could run on GPU\n",
      "      @Expression <AttributeReference> prediction#243 could run on GPU\n",
      "      !Expression <AttributeReference> probability#275 cannot run on GPU because expression AttributeReference probability#275 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "      !Expression <AttributeReference> rawPrediction#209 cannot run on GPU because expression AttributeReference rawPrediction#209 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Transformation takes 15.62 seconds\n",
      "+--------------+--------------------+--------------------+----------+\n",
      "|delinquency_12|       rawPrediction|         probability|prediction|\n",
      "+--------------+--------------------+--------------------+----------+\n",
      "|             0|[8.84631538391113...|[0.99985611438751...|       0.0|\n",
      "|             0|[9.41864871978759...|[0.99991881847381...|       0.0|\n",
      "|             0|[9.41864871978759...|[0.99991881847381...|       0.0|\n",
      "|             0|[9.41864871978759...|[0.99991881847381...|       0.0|\n",
      "|             0|[8.84631538391113...|[0.99985611438751...|       0.0|\n",
      "+--------------+--------------------+--------------------+----------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "def transform():\n",
    "    result = loaded_model.transform(trans_data).cache()\n",
    "    result.foreachPartition(lambda _: None)\n",
    "    return result\n",
    "result = with_benchmark('Transformation', transform)\n",
    "result.select(label, 'rawPrediction', 'probability', 'prediction').show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Evaluation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "def check_classification_accuracy(data_frame, label):\n",
    "    accuracy = (MulticlassClassificationEvaluator()\n",
    "                .setLabelCol(label)\n",
    "                .evaluate(data_frame))\n",
    "    print('-' * 100)\n",
    "    print('Accuracy is ' + str(accuracy))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "22/11/24 06:15:28 WARN com.nvidia.spark.rapids.GpuOverrides: \n",
      "! <DeserializeToObjectExec> cannot run on GPU because not all expressions can be replaced; GPU does not currently support the operator class org.apache.spark.sql.execution.DeserializeToObjectExec\n",
      "  ! <CreateExternalRow> createexternalrow(prediction#243, delinquency_12#1450, 1.0#1449, newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize, StructField(prediction,DoubleType,true), StructField(delinquency_12,DoubleType,true), StructField(1.0,DoubleType,false), StructField(probability,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow\n",
      "    @Expression <AttributeReference> prediction#243 could run on GPU\n",
      "    @Expression <AttributeReference> delinquency_12#1450 could run on GPU\n",
      "    @Expression <AttributeReference> 1.0#1449 could run on GPU\n",
      "    ! <Invoke> newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.Invoke\n",
      "      ! <NewInstance> newInstance(class org.apache.spark.ml.linalg.VectorUDT) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.objects.NewInstance\n",
      "      !Expression <AttributeReference> probability#275 cannot run on GPU because expression AttributeReference probability#275 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "  !Expression <AttributeReference> obj#1455 cannot run on GPU because expression AttributeReference obj#1455 produces an unsupported type ObjectType(interface org.apache.spark.sql.Row)\n",
      "  !Exec <ProjectExec> cannot run on GPU because unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [probability#275]; unsupported data types in input: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [probability#275]; not all expressions can be replaced\n",
      "    @Expression <AttributeReference> prediction#243 could run on GPU\n",
      "    @Expression <Alias> cast(delinquency_12#83 as double) AS delinquency_12#1450 could run on GPU\n",
      "      @Expression <Cast> cast(delinquency_12#83 as double) could run on GPU\n",
      "        @Expression <AttributeReference> delinquency_12#83 could run on GPU\n",
      "    @Expression <Alias> 1.0 AS 1.0#1449 could run on GPU\n",
      "      @Expression <Literal> 1.0 could run on GPU\n",
      "    !Expression <AttributeReference> probability#275 cannot run on GPU because expression AttributeReference probability#275 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "    !Exec <InMemoryTableScanExec> cannot run on GPU because unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [rawPrediction, probability]; unsupported data types in output: org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 [probability#275]; not all expressions can be replaced\n",
      "      @Expression <AttributeReference> delinquency_12#83 could run on GPU\n",
      "      @Expression <AttributeReference> prediction#243 could run on GPU\n",
      "      !Expression <AttributeReference> probability#275 cannot run on GPU because expression AttributeReference probability#275 produces an unsupported type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7\n",
      "\n",
      "[Stage 19:>                                                         (0 + 1) / 1]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "----------------------------------------------------------------------------------------------------\n",
      "Accuracy is 1.0\n",
      "Evaluation takes 2.29 seconds\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "with_benchmark('Evaluation', lambda: check_classification_accuracy(result, label))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
